package com.lyon.demo.protocol.api.remoting;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.lyon.demo.protocol.api.core.Command;
import com.lyon.demo.protocol.api.core.command.RemotingCommandType;
import com.lyon.demo.protocol.api.core.command.ResponseFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

/**
 * @author LeeYan9
 * @since 2022-05-19
 */
@SuppressWarnings("rawtypes")
@Slf4j
@Getter
public abstract class AbstractRemotingService<C, T extends Command> implements RemotingService<C, T> {

    protected Map<Integer, Pair<RequestProcessor<C, T>, ExecutorService>> requestProcessorTable = new ConcurrentHashMap<>();
    protected Map<RpcPredicate<T>, RpcHookProcessor<T>> rpcHookProcessorTable = new ConcurrentHashMap<>();
    protected Map<Long, ResponseFuture> responseTable = new ConcurrentHashMap<>();
    protected ExecutorService callbackExecutor;
    private String processorNotNulMsg = "请求处理器不能为空";


    @Override
    public void addRequestProcessor(RequestProcessor<C, T> requestProcessor, ExecutorService executorService) {

        Assert.notNull(requestProcessor, processorNotNulMsg);
        List<Integer> handlerCodes = requestProcessor.handlerCodes();
        Assert.notEmpty(handlerCodes, processorNotNulMsg);
        Pair<RequestProcessor<C, T>, ExecutorService> pair = new Pair<>(requestProcessor, executorService);
        handlerCodes.forEach(requestHandlerCode -> requestProcessorTable.put(requestHandlerCode, pair));
    }

    @Override
    public void addRpcHook(RpcHookProcessor<T> rpcHookProcessor) {
        Assert.notNull(rpcHookProcessor, "RpcPreHook处理器不能为空");
        Assert.notNull(rpcHookProcessor.predicate(), "RpcPreHook-Predicate 谓语判断不能为空");
        rpcHookProcessorTable.put(rpcHookProcessor.predicate(), rpcHookProcessor);
    }


    public void processRequestCommand(C ctx, T command) {
        switch (command.getCommandType()) {
            case RemotingCommandType.REQUEST:
                processRequest(ctx, command);
                break;
            case RemotingCommandType.RESPONSE:
                processResponse(ctx, command);
                break;
            default:
                log.warn("错误的command-type]");
                break;
        }
    }

    /**
     * 执行接收到的请求消息
     *
     * @param ctx     上下文
     * @param command 请求命令
     */
    protected abstract void processRequest(C ctx, T command);

    /**
     * 执行发送后的回执消息，在 同步请求时
     *
     * @param ctx     上下文
     * @param command 收到的命令行
     */
    protected abstract void processResponse(C ctx, T command);

    protected void doAfterHook(T command, T resp) {
        rpcHookProcessorTable.forEach((predicate, hookProcessor) -> {
            if (predicate.test(command)) {
                hookProcessor.postProcess(command, resp);
            }
        });
    }

    protected void doBeforeHook(T command) {
        rpcHookProcessorTable.forEach((predicate, hookProcessor) -> {
            if (predicate.test(command)) {
                hookProcessor.preProcess(command);
            }
        });
    }

    @Override
    public void setCallbackExecutor(ExecutorService callbackExecutor) {
        this.callbackExecutor = callbackExecutor;
    }
}
